package com.bawei.persona.realtime.app.dws;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


/**
 * 上海大数据学院
 * 项目规划及管理：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 */
public class SQLTEST {

    public static void main(String[] args) throws Exception {

        String paymentWideSinkTopic = "dwm_payment_wide";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //创建tableEnv
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        String sql =  "CREATE TABLE test (\n" +
                "    common ROW<ar STRING,uid STRING,os STRING,ch STRING,is_new STRING,md STRING, mid STRING, vc STRING,ba STRING>,\n" +
                "    page  ROW<page_id STRING , during_time BIGINT,last_page_id STRING>,\n" +
                "    ts BIGINT,\n" +
                "    ts1 AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')),\n" +
                "    WATERMARK FOR ts1 AS ts1 - INTERVAL '5' SECOND \n" +
                ") WITH (\n" +
                "   'connector' = 'kafka', -- 使用 kafka connector\n" +
                "    'topic' = 'dwd_page_log', -- kafka主题\n" +
                "    'scan.startup.mode' = 'earliest-offset', -- 偏移量\n" +
                "    'properties.group.id' = 'group1', -- 消费者组\n" +
                "    'properties.bootstrap.servers' = 'hadoop102:9092', \n" +
                "    'format' = 'json', -- 数据源格式为json\n" +
                "    'json.fail-on-missing-field' = 'false',\n" +
                "    'json.ignore-parse-errors' = 'true'\n" +
                ")";

        TableResult tableResult = tableEnv.executeSql(sql);


        Table table = tableEnv.sqlQuery("select  common.ar as ar,common.uid as uid ,common.os as os ,common.ch as ch ,common.is_new as is_new,common.md as md,common.mid as mid , common.vc as vc  ,common.ba  as ba, ts from test");
        DataStream<BeanV> stringDataStream = tableEnv.toAppendStream(table, BeanV.class);
        stringDataStream.print(">>>>") ;

        env.execute("hello ") ;
    }
}
